<!DOCTYPE html>
<html lang="zh-cn">
<head>
   
    <link type="text/css" rel="stylesheet" href="/bundles/blog-common.css?v=KOZafwuaDasEedEenI5aTy8aXH0epbm6VUJ0v3vsT_Q1"/>
<link id="MainCss" type="text/css" rel="stylesheet" href="/skins/ThinkInside/bundle-ThinkInside.css?v=RRjf6pEarGnbXZ86qxNycPfQivwSKWRa4heYLB15rVE1"/>
<link type="text/css" rel="stylesheet" href="/blog/customcss/428549.css?v=%2fam3bBTkW5NBWhBE%2fD0lcyJv5UM%3d"/>

</head>
<body>
<a name="top"></a>

<div id="page_begin_html"></div><script>load_page_begin_html();</script>

<div id="topics">
	<div class = "post">
		<h1 class = "postTitle">
			<a id="cb_post_title_url" class="postTitle2" href="https://www.cnblogs.com/frankdeng/p/9311474.html">Hadoop（七）YARN的资源调度</a>
		</h1>
		<div class="clear"></div>
		<div class="postBody">
			<div id="cnblogs_post_body" class="blogpost-body"><h3>一、YARN 概述&nbsp;</h3>
<p>　　YARN 是一个资源调度平台，负责为运算程序提供服务器运算资源，相当于一个分布式的操 作系统平台，而 MapReduce 等运算程序则相当于运行于操作系统之上的应用程序</p>
<p>　　YARN 是 Hadoop2.x 版本中的一个新特性。它的出现其实是为了解决第一代 MapReduce 编程 框架的不足，提高集群环境下的资源利用率，这些资源包括内存，磁盘，网络，IO等。Hadoop2.X 版本中重新设计的这个 YARN 集群，具有更好的扩展性，可用性，可靠性，向后兼容性，以 及能支持除 MapReduce 以外的更多分布式计算程序</p>
<p>　　1、YARN 并不清楚用户提交的程序的运行机制</p>
<p>　　2、YARN 只提供运算资源的调度（用户程序向 YARN 申请资源，YARN 就负责分配资源）</p>
<p>　　3、YARN 中的主管角色叫 ResourceManager</p>
<p>　　4、YARN 中具体提供运算资源的角色叫 NodeManager</p>
<p>　　5、这样一来，YARN 其实就与运行的用户程序完全解耦，就意味着 YARN 上可以运行各种类 型的分布式运算程序（MapReduce 只是其中的一种），比如 MapReduce、Storm 程序，Spark 程序，Tez &hellip;&hellip;</p>
<p>　　6、所以，Spark、Storm 等运算框架都可以整合在 YARN 上运行，只要他们各自的框架中有 符合 YARN 规范的资源请求机制即可</p>
<p>　　7、yarn 就成为一个通用的资源调度平台，从此，企业中以前存在的各种运算集群都可以整 合在一个物理集群上，提高资源利用率，方便数据共享</p>
<h3>二、原 MR框架的不足</h3>
<p>　　1、JobTracker 是集群事务的集中处理点，存在单点故障</p>
<p>　　2、JobTracker 需要完成的任务太多，既要维护 job 的状态又要维护 job 的 task 的状态，造成 过多的资源消耗</p>
<p>　　3、在 TaskTracker 端，用 Map/Reduce Task 作为资源的表示过于简单，没有考虑到 CPU、内 存等资源情况，当把两个需要消耗大内存的 Task 调度到一起，很容易出现 OOM</p>
<p>　　4、把资源强制划分为 Map/Reduce Slot，当只有 MapTask 时，TeduceSlot 不能用；当只有 Reduce Task 时，MapSlot 不能用，容易造成资源利用不足。</p>
<p>　　总结起来就是：</p>
<p>　　　　1、扩展性差</p>
<p>　　　　2、可靠性低</p>
<p>　　　　3、资源利用率低</p>
<p>　　　　4、不支持多种计算框架</p>
<h3>三、新版 YARN 架构的优点</h3>
<p>　　YARN/MRv2 最基本的想法是将原 JobTracker 主要的资源管理和 Job 调度/监视功能分开作为 两个单独的守护进程。有一个全局的 ResourceManager(RM)和每个 Application 有一个 ApplicationMaster(AM)，Application 相当于 MapReduce Job 或者 DAG jobs。ResourceManager 和 NodeManager(NM)组成了基本的数据计算框架。ResourceManager 协调集群的资源利用， 任何 Client 或者运行着的 applicatitonMaster 想要运行 Job 或者 Task 都得向 RM 申请一定的资 源。ApplicatonMaster 是一个框架特殊的库，对于 MapReduce 框架而言有它自己的 AM 实现， 用户也可以实现自己的 AM，在运行的时候，AM 会与 NM 一起来启动和监视 Tasks。</p>
<h3><strong>四</strong>、<strong style="font-size: 1.17em;">Y</strong><strong style="font-size: 1.17em;">arn基本架构</strong></h3>
<p><strong><img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519232659494-833750482.png" alt="" /></strong></p>
<p class="16">&nbsp;从YARN的架构图来看，它主要由ResourceManager、NodeManager、ApplicationMaster和Container等以下几个组件构成。</p>
<p class="16">1）ResourceManager（RM）</p>
<p class="16">YARN分层结构的本质是ResourceManager。这个实体控制整个集群并管理应用程序向基础计算资源的分配。ResourceManager将各个资源部分（计算、内存、带宽等）精心安排给基础NodeManager（YARN的每节点代理）。ResourceManager还与ApplicationMaster一起分配资源，与NodeManager一起启动和监视它们的基础应用程序。在此上下文中，ApplicationMaster承担了以前的TaskTracker的一些角色，ResourceManager承担了JobTracker&nbsp;的角色。</p>
<p class="16">总的来说，RM有以下作用</p>
<p class="16">（1）处理客户端请求</p>
<p class="16">（2）启动或监控ApplicationMaster</p>
<p class="16">（3）监控NodeManager</p>
<p class="16">（4）资源的分配与调度</p>
<p class="16">2）ApplicationMaster（AM）</p>
<p class="16">ApplicationMaster管理在YARN内运行的每个应用程序实例。ApplicationMaster负责协调来自ResourceManager的资源，并通过NodeManager监视容器的执行和资源使用（CPU、内存等的资源分配）。请注意，尽管目前的资源更加传统（CPU&nbsp;核心、内存），但未来会带来基于手头任务的新资源类型（比如图形处理单元或专用处理设备）。从YARN角度讲，ApplicationMaster是用户代码，因此存在潜在的安全问题。YARN假设ApplicationMaster存在错误或者甚至是恶意的，因此将它们当作无特权的代码对待。</p>
<p class="16">总的来说,AM有以下作用</p>
<p class="16">（1）负责数据的切分</p>
<p class="16">（2）为应用程序申请资源并分配给内部的任务</p>
<p class="16">（3）任务的监控与容错</p>
<p class="16">3）NodeManager（NM）</p>
<p class="16">NodeManager管理YARN集群中的每个节点。NodeManager提供针对集群中每个节点的服务，从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1通过插槽管理Map&nbsp;和Reduce任务的执行，而NodeManager管理抽象容器，这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。</p>
<p class="16">&nbsp;总的来说，NM有以下作用</p>
<p class="16">&nbsp;（1）管理单个节点上的资源</p>
<p class="16">&nbsp;（2）处理来自ResourceManager的命令</p>
<p class="16">（3）处理来自ApplicationMaster的命令</p>
<p class="16">4）Container</p>
<p class="16">Container是YARN中的资源抽象，它封装了某个节点上的多维度资源，如内存、CPU、磁盘、网络等，当AM向RM申请资源时，RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container，且该任务只能使用该Container中描述的资源。</p>
<p class="16">总的来说，Container有以下作用</p>
<p class="16">对任务运行环境进行抽象，封装CPU、内存等多维度的资源以及环境变量、启动命令等任务运行相关的信息</p>
<p class="16">要使用一个YARN集群，首先需要一个包含应用程序的客户的请求。ResourceManager协商一个容器的必要资源，启动一个ApplicationMaster来表示已提交的应用程序。通过使用一个资源请求协议，ApplicationMaster协商每个节点上供应用程序使用的资源容器。执行应用程序时，ApplicationMaster监视容器直到完成。当应用程序完成时，ApplicationMaster从&nbsp;ResourceManager注销其容器，执行周期就完成了。</p>
<p class="16">&nbsp;&nbsp;通过上面的讲解，应该明确的一点是，旧的Hadoop架构受到了JobTracker的高度约束，JobTracker负责整个集群的资源管理和作业调度。新的YARN架构打破了这种模型，允许一个新ResourceManager管理跨应用程序的资源使用，ApplicationMaster负责管理作业的执行。这一更改消除了一处瓶颈，还改善了将Hadoop集群扩展到比以前大得多的配置的能力。此外，不同于传统的MapReduce，YARN允许使用MPI( Message Passing Interface)&nbsp;等标准通信模式，同时执行各种不同的编程模型，包括图形处理、迭代式处理、机器学习和一般集群计算。</p>
<h3><strong>五</strong>、<strong>Y</strong><strong>arn工作机制</strong></h3>
<p class="16">1）Yarn运行机制</p>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233045079-1109533474.png" alt="" /></p>
<p class="16">2）工作机制详解</p>
<p>（0）Mr程序提交到客户端所在的节点</p>
<p>（1）Yarnrunner向Resourcemanager申请一个Application。</p>
<p>（2）rm将该应用程序的资源路径返回给yarnrunner</p>
<p>（3）该程序将运行所需资源提交到HDFS上</p>
<p>（4）程序资源提交完毕后，申请运行mrAppMaster</p>
<p>（5）RM将用户的请求初始化成一个task</p>
<p>（6）其中一个NodeManager领取到task任务。</p>
<p>（7）该NodeManager创建容器Container，并产生MRAppmaster</p>
<p>（8）Container从HDFS上拷贝资源到本地</p>
<p>（9）MRAppmaster向RM 申请运行maptask容器</p>
<p>（10）RM将运行maptask任务分配给另外两个NodeManager，另两个NodeManager分别领取任务并创建容器。</p>
<p>（11）MR向两个接收到任务的NodeManager发送程序启动脚本，这两个NodeManager分别启动maptask，maptask对数据分区排序。</p>
<p>（12）MRAppmaster向RM申请2个容器，运行reduce task。</p>
<p>（13）reduce&nbsp;task向maptask获取相应分区的数据。</p>
<p>（14）程序运行完毕后，MR会向RM注销自己。</p>
<h3>六、<strong>资源调度器</strong></h3>
<p>目前，Hadoop作业调度器主要有三种：FIFO、Capacity Scheduler和Fair Scheduler。Hadoop2.7.2默认的资源调度器是Capacity Scheduler。</p>
<p>具体设置详见：yarn-default.xml文件</p>
<table border="1" cellspacing="0">
<tbody>
<tr>
<td valign="top" width="568">
<p>&lt;property&gt;</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&lt;description&gt;The class to use as the resource scheduler.&lt;/description&gt;</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&lt;name&gt;yarn.resourcemanager.scheduler.class&lt;/name&gt;</p>
<p>&lt;value&gt;org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler&lt;/value&gt;</p>
<p>&lt;/property&gt;</p>
</td>
</tr>
</tbody>
</table>
<p>1）先进先出调度器（FIFO）</p>
<p>　　FIFO是Hadoop中默认的调度器，也是一种批处理调度器。它先按照作业的优先级高低，再按照到达时间的先后选择被执行的作业。原理图如下所示。</p>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233232633-1257419711.png" alt="" /></p>
<p>&nbsp; 比如，一个TaskTracker正好有一个空闲的slot，此时FIFO调度器的队列已经排好序，就选择排在最前面的任务job1，job1包含很多map task和reduce task。假如空闲资源是map slot，我们就选择job1中的map task。假如map task0要处理的数据正好存储在该TaskTracker&nbsp;节点上，根据数据的本地性，调度器把map task0分配给该TaskTracker。FIFO&nbsp;调度器整体就是这样一个过程。</p>
<p>2）容量调度器（Capacity Scheduler）</p>
<p>支持多个队列，每个队列可配置一定的资源量，每个队列采用FIFO调度策略，为了防止同一个用户的作业独占队列中的资源，该调度器会对同一用户提交的作业所占资源量进行限定。调度时，首先按以下策略选择一个合适队列：计算每个队列中正在运行的任务数与其应该分得的计算资源之间的比值，选择一个该比值最小的队列；然后按以下策略选择该队列中一个作业：按照作业优先级和提交时间顺序选择，同时考虑用户资源量限制和内存限制。 原理图如下所示。</p>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233332495-1992933232.png" alt="" /></p>
<p>&nbsp; &nbsp;比如我们分为三个队列：queueA、queueB和queueC，每个队列的&nbsp;job&nbsp;按照到达时间排序。假如这里有100个slot，queueA分配20%的资源，可配置最多运行15个task，queueB&nbsp;分配50%的资源，可配置最多运行25个task，queueC分配30%的资源，可配置最多运行25个task。这三个队列同时按照任务的先后顺序依次执行，比如，job11、job21和job31分别排在队列最前面，是最先运行，也是同时运行。</p>
<p>3）公平调度器（Fair Scheduler）</p>
<p>同计算能力调度器类似，支持多队列多用户，每个队列中的资源量可以配置，同一队列中的作业公平共享队列中所有资源。原理图如下所示</p>
<p><img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233428834-1643102071.png" alt="" /></p>
<p>比如有三个队列：queueA、queueB和queueC，每个队列中的&nbsp;job&nbsp;按照优先级分配资源，优先级越高分配的资源越多，但是每个&nbsp;job&nbsp;都会分配到资源以确保公平。在资源有限的情况下，每个&nbsp;job&nbsp;理想情况下获得的计算资源与实际获得的计算资源存在一种差距， 这个差距就叫做缺额。在同一个队列中，job的资源缺额越大，越先获得资源优先执行。作业是按照缺额的高低来先后执行的，而且可以看到上图有多个作业同时运行。</p>
<h3><strong>七</strong>、<strong>作业提交全</strong><strong>过程</strong></h3>
<p>1）作业提交过程之YARN</p>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233539253-1172351940.png" alt="" />&nbsp;</p>
<p>2）作业提交过程之MapReduce</p>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233652166-607777814.png" alt="" /></p>
<p>3）作业提交过程之读数据</p>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233733403-453118474.png" alt="" /></p>
<p>4）作业提交过程之写数据</p>
<p>&nbsp;<img src="https://images2018.cnblogs.com/blog/1385722/201805/1385722-20180519233817770-1628859834.png" alt="" /></p>
<p>5）作业提交详解</p>
<p>（1）作业提交</p>
<p>client调用job.waitForCompletion方法，向整个集群提交MapReduce作业&nbsp;(第1步)&nbsp;。 新的作业ID(应用ID)由资源管理器分配(第2步)。作业的client核实作业的输出,&nbsp;计算输入的split,&nbsp;将作业的资源(包括Jar包,&nbsp;配置文件, split信息)拷贝给HDFS(第3步)。最后,&nbsp;通过调用资源管理器的submitApplication()来提交作业(第4步)。</p>
<p>（2）作业初始化</p>
<p>当资源管理器收到submitApplciation()的请求时,&nbsp;就将该请求发给调度器(scheduler),&nbsp;调度器分配container,&nbsp;然后资源管理器在该container内启动应用管理器进程,&nbsp;由节点管理器监控(第5步)。</p>
<p>MapReduce作业的应用管理器是一个主类为MRAppMaster的Java应用。其通过创造一些bookkeeping对象来监控作业的进度,&nbsp;得到任务的进度和完成报告(第6步)。然后其通过分布式文件系统得到由客户端计算好的输入split(第7步)。然后为每个输入split创建一个map任务,&nbsp;根据mapreduce.job.reduces创建reduce任务对象。</p>
<p>（3）任务分配</p>
<p>如果作业很小,&nbsp;应用管理器会选择在其自己的JVM中运行任务。</p>
<p>如果不是小作业,&nbsp;那么应用管理器向资源管理器请求container来运行所有的map和reduce任务(第8步)。这些请求是通过心跳来传输的,&nbsp;包括每个map任务的数据位置,&nbsp;比如存放输入split的主机名和机架(rack)。调度器利用这些信息来调度任务,&nbsp;尽量将任务分配给存储数据的节点,&nbsp;或者分配给和存放输入split的节点相同机架的节点。</p>
<p>（4）任务运行</p>
<p>当一个任务由资源管理器的调度器分配给一个container后,&nbsp;应用管理器通过联系节点管理器来启动container(第9步)。任务由一个主类为YarnChild的Java应用执行。在运行任务之前首先本地化任务需要的资源,&nbsp;比如作业配置, JAR文件,&nbsp;以及分布式缓存的所有文件(第10步)。最后,&nbsp;运行map或reduce任务(第11步)。</p>
<p>YarnChild运行在一个专用的JVM中,&nbsp;但是YARN不支持JVM重用。</p>
<p>（5）进度和状态更新</p>
<p>YARN中的任务将其进度和状态(包括counter)返回给应用管理器,&nbsp;客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新,&nbsp;展示给用户。</p>
<p>（6）作业完成</p>
<p>除了向应用管理器请求作业进度外,&nbsp;客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后,&nbsp;应用管理器和container会清理工作状态, OutputCommiter的作业清理方法也会被调用。作业的信息会被作业历史服务器存储以备之后用户核查。</p>
<h3><strong>八</strong>、<strong>任务</strong><strong>的</strong><strong>推测</strong><strong>执行</strong></h3>
<p>1）作业完成时间取决于最慢的任务完成时间</p>
<p>一个作业由若干个Map任务和Reduce任务构成。因硬件老化、软件Bug等，某些任务可能运行非常慢。</p>
<p>典型案例：系统中有99%的Map任务都完成了，只有少数几个Map老是进度很慢，完不成，怎么办？</p>
<p>2）推测执行机制：</p>
<p>发现拖后腿的任务，比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务，同时运行。谁先运行完，则采用谁的结果。</p>
<p>3）执行推测任务的前提条件</p>
<p>（1）每个task只能有一个备份任务；</p>
<p>（2）当前job已完成的task必须不小于0.05（5%）</p>
<p>（3）开启推测执行参数设置。Hadoop2.7.2 mapred-site.xml文件中默认是打开的。</p>
<table border="1" cellspacing="0">
<tbody>
<tr>
<td valign="top" width="510">
<p>&lt;property&gt;</p>
<p>&nbsp;&nbsp;&lt;name&gt;mapreduce.map.speculative&lt;/name&gt;</p>
<p>&nbsp;&nbsp;&lt;value&gt;true&lt;/value&gt;</p>
<p>&nbsp;&nbsp;&lt;description&gt;If true, then multiple instances of some map tasks</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;may be executed in parallel.&lt;/description&gt;</p>
<p>&lt;/property&gt;</p>
<p>&nbsp;</p>
<p>&lt;property&gt;</p>
<p>&nbsp;&nbsp;&lt;name&gt;mapreduce.reduce.speculative&lt;/name&gt;</p>
<p>&nbsp;&nbsp;&lt;value&gt;true&lt;/value&gt;</p>
<p>&nbsp;&nbsp;&lt;description&gt;If true, then multiple instances of some reduce tasks</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;may be executed in parallel.&lt;/description&gt;</p>
<p>&lt;/property&gt;</p>
</td>
</tr>
</tbody>
</table>
<p>4）不能启用推测执行机制情况</p>
<p>&nbsp;&nbsp;&nbsp;（1）任务间存在严重的负载倾斜；</p>
<p>&nbsp;&nbsp;&nbsp;（2）特殊任务，比如任务向数据库中写数据。</p>
<p>5）算法原理：</p>
<p>假设某一时刻，任务T的执行进度为progress，则可通过一定的算法推测出该任务的最终完成时刻estimateEndTime。另一方面，如果此刻为该任务启动一个备份任务，则可推断出它可能的完成时刻estimateEndTime`,于是可得出以下几个公式：</p>
<p>estimateEndTime=estimatedRunTime+taskStartTime</p>
<p>estimatedRunTime=(currentTimestamp-taskStartTime)/progress</p>
<p>estimateEndTime`= currentTimestamp+averageRunTime</p>
<p>其中，</p>
<p>currentTimestamp为当前时刻；</p>
<p>taskStartTime为该任务的启动时刻；</p>
<p>averageRunTime为已经成功运行完成的任务的平均运行时间；</p>
<p>progress是任务运行的比例（0.1-1）；</p>
<p>这样，MRv2总是选择（estimateEndTime- estimateEndTime&middot;）差值最大的任务，并为之启动备份任务。为了防止大量任务同时启动备份任务造成的资源浪费，MRv2为每个作业设置了同时启动的备份任务数目上限。</p>
<p>推测执行机制实际上采用了经典的优化算法：以空间换时间，它同时启动多个相同任务处理相同的数据，并让这些任务竞争以缩短数据处理时间。显然，这种方法需要占用更多的计算资源。在集群资源紧缺的情况下，应合理使用该机制，争取在多用少量资源的情况下，减少作业的计算时间。</p></div>

</body>
</html>
